package com.luke.dt.stock.service;

import com.alibaba.fastjson.JSONObject;
import com.luke.dt.commons.entitys.shared.TblDtMSG;
import com.luke.dt.commons.exception.ServiceException;
import com.luke.dt.stock.dao.TblDtMsgMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * TODO
 * 这里的消息发送需要改造成事物发送或者publish confirm。
 * 并且配合mandatory属性才能保证消息不丢失（消息队列服务器再持久化期间挂掉除外）
 * 这里重申rabbitmq在极其极端情况下会消息丢失（参考UserApp的说明）
 */
@Slf4j
@Service
@Transactional(readOnly = true)
public class AmqpSenderServiceImpl implements AmqpSenderService{

    @Autowired(required = false)
    private TblDtMsgMapper tblDtMsgMapper;

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Transactional(readOnly = false)
    @Override
    public void handlesendDtMSG() throws ServiceException {
        List<TblDtMSG> dtMSGList = tblDtMsgMapper.findDtMSGList(2, 5);
        for(TblDtMSG dtMSG : dtMSGList){
            try {
                Map<String,Object> queueMsg = new HashMap<String,Object>();//封装发送给消息队列的消息
                String data = dtMSG.getData();
                queueMsg.put("msgId",dtMSG.getId());
                queueMsg.put("msgData",data);
                if(dtMSG.getQueueVal().equals("dt_queue_user")){//有些消息可能需要对data进行进一步封装
                    amqpTemplate.convertAndSend(dtMSG.getQueueVal(),JSONObject.toJSONString(queueMsg));
                }else{
                    amqpTemplate.convertAndSend(dtMSG.getQueueVal(),JSONObject.toJSONString(queueMsg));
                }
                // TODO: 2019/4/26
                //更新次数
                TblDtMSG updateDtMSG = new TblDtMSG();
                updateDtMSG.setId(dtMSG.getId());
                updateDtMSG.setUpdateTime(new Date());
                updateDtMSG.setStatusVal(2);//发送成功
                tblDtMsgMapper.updateByPrimaryKeySelective(updateDtMSG);
            } catch (AmqpException e) {
                log.error("AmqpException出现错误：{}",e.getMessage());
                TblDtMSG updateDtMSG = new TblDtMSG();
                updateDtMSG.setId(dtMSG.getId());
                updateDtMSG.setUpdateTime(new Date());
                updateDtMSG.setRetryNum(dtMSG.getRetryNum()+1);//重试次数+1
                updateDtMSG.setStatusVal(3);//发送失败
                tblDtMsgMapper.updateByPrimaryKeySelective(updateDtMSG);
            }
        }
    }
}
